- 启动MySQL
- 创建Maxwell的数据库和用户
- 在MySQL中创建一个测试数据库和表
前3步详细步骤见实时数仓之Maxwell读取MySQL binlog日志
启动Zookeeper
1 | [hadoop@hadoop001 ~]$ cd $ZK_HOME/bin |
启动Kafka,并创建主题为Maxwell的topic
1 | [hadoop@hadoop001 bin]$ cd $KAFKA_HOME |
启动Kafka的消费者,检查数据到没到位
1 | [hadoop@hadoop001 kafka]$ bin/kafka-console-consumer.sh --zookeeper 192.168.137.2:2181/kafka --topic maxwell --from-beginning |
启动Maxwell进程
1 | /先检查maxwell是否支持kafka-0.10.0.1 |
MySQL中update一条数据
1 | mysql> update emp set sal=502 where empno=6001; |
查看Kafka的消费者窗口
1 | //对应第一条insert语句 |
数据已经正常过来了
Maxwell的过滤功能
参考过滤配置:http://maxwells-daemon.io/filtering/
1 | [root@hadoop001 maxwell-1.17.1]# bin/maxwell --user='maxwell' --password='maxwell' \ |
--filter 'exclude: *.*, include:hlwtest.emp1'
的意思是只监控hlwtest.emp1
表的变化,其他的都不监控
1 | //MySQL中update数据 |
1 | //Kafka消费者接收到的数据 |
确实只消费到了emp1表的update语句,而没有接收到emp表的更新
Maxwell 的bootstrap
1 | mysql> insert into maxwell.bootstrap (database_name, table_name) values ("hlwtest", "emp"); |
Kafka的消费者窗口:
1 | {"database":"maxwell","table":"bootstrap","type":"insert","ts":1552102248,"xid":1555,"commit":true,"data":{"id":1,"database_name":"hlwtest","table_name":"emp","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}} |